Dataflow SQL で Pub/Sub のストリームデータと BigQuery のテーブルデータを結合してみた
こんにちは、みかみです。
Dataflow SQL を使うと、BigQuery 管理コンソールのクエリエディタに入力した SQL を、簡単にジョブ実行できるそうです。
BigQuery テーブルに対する SQL のジョブ実行はもちろん、データソースには Pub/Sub のストリームデータや GCS のファイルデータも指定できるそうなので、試してみました。
やりたいこと
- Dataflow SQL をさわってみたい
- Dataflow SQL でジョブ実行するにはどうすればよいのか知りたい
- Dataflow SQL で Pub/Sub のストリーミングデータと BigQuery テーブルデータを結合してみたい
前提
公式ドキュメントの以下の処理を実際に動かしてみます。
動作確認には Cloud Shell を使用するため、Google Cloud SDK のインストールやサービスアカウントの設定は省略します。
また、Dataflow SQL ジョブ実行に必要な各種 API は有効化済みです。
Pub/Sub トピックとデータ送信スクリプトを準備
動作確認に使用するストリームデータを送信するスクリプトと Pub/Sub トピックを作成します。
gcloud pubsub
コマンドで transactions
トピックを作成しました。
gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ gcloud pubsub topics create transactions Created topic [projects/cm-da-mikami-yuki-258308/topics/transactions].
また、ドキュメントに記載の 1 ~ 5 秒間隔で先ほど作成した Pub/Sub トピックにメッセージをパブリッシュする以下の Python コードを transactions_injector.py
というファイル名で保存しました。
import datetime, json, os, random, time # Set the `project` variable to a Google Cloud project ID. project = 'cm-da-mikami-yuki-258308' FIRST_NAMES = ['Monet', 'Julia', 'Angelique', 'Stephane', 'Allan', 'Ulrike', 'Vella', 'Melia', 'Noel', 'Terrence', 'Leigh', 'Rubin', 'Tanja', 'Shirlene', 'Deidre', 'Dorthy', 'Leighann', 'Mamie', 'Gabriella', 'Tanika', 'Kennith', 'Merilyn', 'Tonda', 'Adolfo', 'Von', 'Agnus', 'Kieth', 'Lisette', 'Hui', 'Lilliana',] CITIES = ['Washington', 'Springfield', 'Franklin', 'Greenville', 'Bristol', 'Fairview', 'Salem', 'Madison', 'Georgetown', 'Arlington', 'Ashland',] STATES = ['MO','SC','IN','CA','IA','DE','ID','AK','NE','VA','PR','IL','ND','OK','VT','DC','CO','MS', 'CT','ME','MN','NV','HI','MT','PA','SD','WA','NJ','NC','WV','AL','AR','FL','NM','KY','GA','MA', 'KS','VI','MI','UT','AZ','WI','RI','NY','TN','OH','TX','AS','MD','OR','MP','LA','WY','GU','NH'] PRODUCTS = ['Product 2', 'Product 2 XL', 'Product 3', 'Product 3 XL', 'Product 4', 'Product 4 XL', 'Product 5', 'Product 5 XL',] while True: first_name, last_name = random.sample(FIRST_NAMES, 2) data = { 'tr_time_str': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'first_name': first_name, 'last_name': last_name, 'city': random.choice(CITIES), 'state':random.choice(STATES), 'product': random.choice(PRODUCTS), 'amount': float(random.randrange(50000, 70000)) / 100, } # For a more complete example on how to publish messages in Pub/Sub. # https://cloud.google.com/pubsub/docs/publisher message = json.dumps(data) command = "gcloud --project={} pubsub topics publish transactions --message='{}'".format(project, message) print(command) os.system(command) time.sleep(random.randrange(1, 5))
BigQuery のデータセットとテーブルを作成してテストデータをロード
Pub/Sub のストリームデータと結合する BigQuery のテーブルデータを準備します。
ドキュメントに記載のある以下のサンプルデータを CSV ファイルに保存して、GCS にアップロードしました。
state_id,state_code,state_name,sales_region 1,MO,Missouri,Region_1 2,SC,South Carolina,Region_1 3,IN,Indiana,Region_1 6,DE,Delaware,Region_2 15,VT,Vermont,Region_2 (省略) 45,KY,Kentucky,Region_8 53,WI,Wisconsin,Region_8 57,OH,Ohio,Region_8 49,VI,United States Virgin Islands,Region_9 62,MP,Commonwealth of the Northern Mariana Islands,Region_9
以下の bq
コマンドで、データセットを作成した後、スキーマ自動検出でテーブルを新規作成してサンプルデータをロードします。
gcp_da_user@cloudshell:~/dataflow (cm-da-mikami-yuki-258308)$ bq --location=asia-northeast1 mk -d \ > dataflow_sql_dataset Dataset 'cm-da-mikami-yuki-258308:dataflow_sql_dataset' successfully created.
gcp_da_user@cloudshell:~/dataflow (cm-da-mikami-yuki-258308)$ bq load \ > --autodetect \ > --source_format=CSV \ > dataflow_sql_dataset.us_state_salesregions \ > gs://test-mikami-dataflow/us_state_salesregions.csv Waiting on bqjob_r66199b966b89e403_00000175495375ef_1 ... (1s) Current status: DONE
データセットの作成と新規テーブルにサンプルデータがロードできました。
BigQuery リソースに Pub/Sub トピックを追加してスキーマを定義
Dataflow 管理コンソール上部の「SQL からジョブを実行」をクリックし、Dataflow エンジンに変換済みの BigQuery 管理画面に遷移します。
BigQuery 管理コンソール「その他」のプルダウンから、「クエリの設定」で「クエリエンジン」を「Cloud Dataflow エンジン」に変更でも大丈夫です。
ナビゲーションパネル「リソース」の右横の「データを追加」リンクをクリックし、「Cloud Dataflow のソース」を選択します。
「Cloud Pub/Sub トピック」のラジオボタンが ON になっていることを確認して、transactions
トピックにチェックして「追加」をクリックします。
ナビゲーションメニュー「リソース」に「Cloud Pub/Sub トピック」の「transactions」が追加されたことを確認して、「スキーマを編集」ボタンをクリック。 スキーマ編集画面で「テキストとして編集」チェックを ON に変更後、テキストボックスにドキュメントに記載のスキーマをペーストして「送信」します。
transactions
トピックのスキーマが定義できました。
Dataflow SQL ジョブを作成して実行
ドキュメント記載の SQL を BigQuery 管理コンソールクエリエディタにペーストしてクエリを検証したら、「Cloud Dataflow ジョブを作成」ボタンをクリックします。
「リージョンエンドポイント」を europe-west4
に変更し、「Destination」の「出力タイプ」で「BigQuery」を選択。
「データセットID」と「テーブル名」を入力して「作成」します。
データ送信用のスクリプトを実行してしばらく待ちます。
gcp_da_user@cloudshell:~/dataflow (cm-da-mikami-yuki-258308)$ python3 transactions_injector.py gcloud --project=cm-da-mikami-yuki-258308 pubsub topics publish transactions --message='{"tr_time_str": "2020-10-23 12:06:52", "first_name": "Kennith", "last_name": "Leighann", "city ": "Ashland", "state": "LA", "product": "Product 2 XL", "amount": 522.98}' messageIds: - '1668558289758654' gcloud --project=cm-da-mikami-yuki-258308 pubsub topics publish transactions --message='{"tr_time_str": "2020-10-23 12:06:54", "first_name": "Hui", "last_name": "Tonda", "city": "Gre enville", "state": "MN", "product": "Product 5", "amount": 536.55}' messageIds: - '1668558415047767' gcloud --project=cm-da-mikami-yuki-258308 pubsub topics publish transactions --message='{"tr_time_str": "2020-10-23 12:06:57", "first_name": "Ulrike", "last_name": "Monet", "city": " Salem", "state": "TX", "product": "Product 3 XL", "amount": 641.26}' messageIds: - '1668573944654240'
ジョブ実行状況は、Dataflow 管理画面からも確認できます。
5分ほど経ってから、BigQuery のテーブルを確認してみました。
期待通り、Dataflow SQL のジョブで実行した SQL の結果が BigQuery のテーブルに格納されていることが確認できました。
つまずいたところ
Dataflow SQL ジョブ実行時、NullPointerException のエラー終了が発生しました。。
Pub/Sub トピックのスキーマを変更したり、location を変更したり、切り分けのためにソースデータを GCS にして同様の処理を実行したり、一般公開の Pub/Sub に変更してみたりしましたが、原因分からず。。
- 一般的なエラーのガイダンス | Dataflow ドキュメント
- トラブルシューティングとデバッグ | Dataflow ドキュメント
- SQL を使用したクイックスタート | Dataflow ドキュメント
結局、下記投稿の Answer で解決しました!(どうもありがとうございますmm
2020/10/23 現在、asia-northeast1 および us-central1 のリージョンでは、Pub/Sub ストリームデータをソースとした Dataflow SQL ジョブの正常終了は確認できませんでしたが、 同じ手順で europe-west4 で実行すると正常に実行できることが確認できました。
他リージョンでも近々対応してもらえるのではないかと思います。
まとめ(所感)
2020/10 現在、Dataflow SQL のデータソースとして利用できるのは以下の 3 つです。
- Pub/Sub トピック
- Cloud Storage ファイルセット
- BigQuery テーブル
また、Dataflow SQL で実行したクエリの出力先は、BigQuery または Pub/Sub トピックのどちらかです。
Dataflow SQL を使えば、Pub/Sub のストリームデータや GCS 上のファイルデータをロードすることなく、BigQuery のテーブルデータと結合できるので便利だと思いました。
また、Dataflow SQL の実行手順も、ソースデータの指定方法などの若干の考慮は必要ですが、通常通り BigQuery 管理コンソールでクエリエディタに入力する感覚で気軽にジョブ実行できるので簡単でした。
今後もデータソースや出力先の追加など、より便利になるアップデートを心待ちにしております!